﻿using Gasgoo.ShareLibrary.Framework.Global;
using Gasgoo.ShareLibrary.Framework.Options;
using Orleans;
using Orleans.Streams;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

namespace Gasgoo.ShareLibrary.Framework.iQueueProvider
{
    public interface IQueueProducer<T>
    {
        Task SendMessageAsync(T message);
        Task SendMessageAsync(List<T> message);
    }

    public class QueueProducerHandler<T> : IQueueProducer<T>
    {
        private Subject<BatchItemOptions<T>> _subject;
        private IAsyncStream<T> stream;
        private bool isBatchSubmit;

        public QueueProducerHandler(string topic, string streamNamespace) : this(
            new DefaultQueueProvider(ServiceManage.Get<IClusterClient>(Constants.HOST_SERVICE_PROVIDER).GetStreamProvider(Constants.SMS_PROVIDER)),
            topic,
            streamNamespace)
        {
        }

        /// <summary>
        /// 生产者
        /// </summary>
        /// <param name="cluster">集群ID</param>
        /// <param name="topic">订阅主题</param>
        /// <param name="streamNamespace">流空间</param>
        ///// <param name="providerName">流提供商Title</param>
        /// <param name="isBatchSubmit">是否批量提交：默认为-Ture，每200ms or 200/条提交一次。</param>
        public QueueProducerHandler(IQueueProvider provider, string topic, string streamNamespace)
        {
            Guid streamId = GasgooUtils.ToGuid(topic);
            this.stream = provider.GetStream<T>(streamId, streamNamespace);

            this.isBatchSubmit = provider.Name != Constants.SMS_PROVIDER;
            if (isBatchSubmit)
            {
                _subject = new Subject<BatchItemOptions<T>>();
                // 200毫秒 或者 200 条
                _subject.Buffer(TimeSpan.FromMilliseconds(10), 200)
                    .Where(x => x.Count > 0)
                    .Select(list => Observable.FromAsync(() => SendMessageAsync(list)))
                    .Concat()
                    .Subscribe();
            }

        }

        /// <summary>
        /// 配置批量提交参数
        /// </summary>
        /// <param name="minSubmitTime">最小提交时间</param>
        /// <param name="maxCount">批最多消息数</param>
        public void ConfigBatchOptions(TimeSpan minSubmitTime, int maxCount = 200)
        {
            if (isBatchSubmit)
            {
                _subject.Dispose();
                _subject = new Subject<BatchItemOptions<T>>();
                _subject.Buffer(minSubmitTime, maxCount)
                    .Where(x => x.Count > 0)
                    .Select(list => Observable.FromAsync(() => SendMessageAsync(list)))
                    .Concat()
                    .Subscribe();
            }
        }

        /// <summary>
        /// 注：次设置废弃。
        /// 设置批提交状态；
        /// 如果需要严格消费顺序，在消费者消费成功情况下只消费一次。
        /// 可以关闭批提交，但是次操作影响性能
        /// </summary>
        [Obsolete("如有严格消费顺序，建议使用Service方案。以单线程模型的高速队列会比使用流方案效率更高。框架提供分布式计算模型，以避免单机瓶颈导致的种种问题")]
        public void SetBatchValue(bool isBatchSubmit)
        {
            this.isBatchSubmit = isBatchSubmit;
        }

        public async Task SendMessageAsync(T message)
        {
            if (isBatchSubmit)
            {
                // 以下至少消费一次，可能会出现重复消费
                TaskCompletionSource<Task> source = new TaskCompletionSource<Task>();
                _subject.OnNext(new BatchItemOptions<T>
                {
                    Body = message,
                    TaskSource = source
                });
                await source.Task;
            }
            else
            {
                await this.stream.OnNextAsync(message);
            }
        }

        public async Task SendMessageAsync(List<T> messages)
        {
            await this.stream.OnNextBatchAsync(messages);
        }

        private async Task SendMessageAsync(IList<BatchItemOptions<T>> messages)
        {
            try
            {
                Console.WriteLine("SendMessageAsync - Count:{0}", messages.Count);
                await this.stream.OnNextBatchAsync(messages.Select(item => item.Body));

                foreach (var item in messages)
                {
                    item.TaskSource.TrySetResult(Task.CompletedTask);
                }
            }
            catch (Exception ex)
            {
                foreach (var item in messages)
                {
                    item.TaskSource.TrySetException(ex);
                }
            }
        }

    }
}
